Este Notebook introduce el concepto de los stacked autoencoders. Para ello necesitamos crear una clase que codifique una capa del tipo Denoising Auto-Encoders.
Una capa Auto-Encoder dispone de un encoder y un decoder que realizan el aprendizaje de los patrones de entrada. Básicamente se puede expresar esto matemáticamente como:
Siendo s la función sigmoidea, por ejemplo.
W' sería el mapeo inverso, y z sería la reconstrucción de x (datos de entrada). Una forma de calcular W' sería:
\begin{align} W' &= W^T \end{align}Con lo cual tenemos tres parámetros en esta capa:
\begin{align} {W, b, b'} \end{align}Se puede calcular de muchas formas, pero de forma sencilla se puede definir como la entropía cruzada:
\begin{align} L_H(x,z) &= -\sum_{k=1}^d[X_k*log(Z_k) + (1-X_k)*log(1-Z_k)] \end{align}Si utilizamos el esquema anterior, el Auto-Encoder representará la identificación únicamente de los datos de entrada. Pero es mucho más ventajoso que el Auto-Encoder extraiga las representaciones o características más representativas de los datos.
Una sencilla técnica es introducir una serie de distorsiones, o más bien dicho, corrompemos la entrada de una determinada forma. Por ejemplo, eliminando alguno de los datos de entrada (puestos a cero). En theano esto se puede realizar con la función theano_rng.binomial, que devuelve un vector con 1s y 0s, usados para filtrar/corromper los datos de entrada.
In [ ]:
class dA(object):
"""Denoising Auto-Encoder class (dA)
"""
def __init__(
self,
numpy_rng,
theano_rng=None,
input=None,
n_visible=784,
n_hidden=500,
W=None,
bhid=None,
bvis=None
):
self.n_visible = n_visible
self.n_hidden = n_hidden
# create a Theano random generator that gives symbolic random values
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
# note : W' was written as `W_prime` and b' as `b_prime`
if not W:
initial_W = numpy.asarray(
numpy_rng.uniform(
low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),
high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
W = theano.shared(value=initial_W, name='W', borrow=True)
if not bvis:
bvis = theano.shared(
value=numpy.zeros(
n_visible,
dtype=theano.config.floatX
),
borrow=True
)
if not bhid:
bhid = theano.shared(
value=numpy.zeros(
n_hidden,
dtype=theano.config.floatX
),
name='b',
borrow=True
)
self.W = W
# b corresponds to the bias of the hidden
self.b = bhid
# b_prime corresponds to the bias of the visible
self.b_prime = bvis
# tied weights, therefore W_prime is W transpose
self.W_prime = self.W.T
self.theano_rng = theano_rng
# if no input is given, generate a variable representing the input
if input is None:
# we use a matrix because we expect a minibatch of several
# examples, each example being a row
self.x = T.dmatrix(name='input')
else:
self.x = input
self.params = [self.W, self.b, self.b_prime]
def get_corrupted_input(self, input, corruption_level):
return self.theano_rng.binomial(size=input.shape, n=1,
p=1 - corruption_level,
dtype=theano.config.floatX) * input
def get_hidden_values(self, input):
""" Computes the values of the hidden layer """
return T.nnet.sigmoid(T.dot(input, self.W) + self.b)
def get_reconstructed_input(self, hidden):
"""Computes the reconstructed input given the values of the
hidden layer
"""
return T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime)
def get_cost_updates(self, corruption_level, learning_rate):
""" This function computes the cost and the updates for one trainng
step of the dA """
tilde_x = self.get_corrupted_input(self.x, corruption_level)
y = self.get_hidden_values(tilde_x)
z = self.get_reconstructed_input(y)
L = - T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1)
cost = T.mean(L)
# compute the gradients of the cost of the `dA` with respect
# to its parameters
gparams = T.grad(cost, self.params)
# generate the list of updates
updates = [
(param, param - learning_rate * gparam)
for param, gparam in zip(self.params, gparams)
]
return (cost, updates)
En la sección anterior hemos preparado la clase necesaria para incluir un Denoising Auto-Encoder (DaE) en nuestras redes.
El ejemplo que proponemos en este Notebook es un MLP de dos capas ocultas y una capa de salida (Logistic Regression).
Esta será la red sobre la que vamos a incluir las capas DaE, es decir, un simple MLP:
Y la red resultante que obtendremos con la inclusión de las correspondientes capas DaE será la siguiente:
Esta nueva red se convertiría en una red del tipo Stacked Auto-Encoders (SaE).
En este ejemplo en concreto vamos a clasificar imágenes de 20x20 pixels de números escritos a mano desde el 0 al 9. Los datos de entrenamiento están guardados en el fichero digits.mat de matlab.
In [ ]:
import scipy.io as io
import numpy
import matplotlib.pyplot as plt
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
from mlp import CapaOculta, LogisticRegression
from dA import dA
In [ ]:
class SdA_MLP_Layer(object):
def __init__(self, input, numpy_rng, theano_rng=None, n_ins=784, hidden_layers_size=500, corruption_level=.1):
# Tensor de entrada
self.input = input
if not theano_rng:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
# Capa sigmoidea
self.sigmoid_layer = CapaOculta(rng=numpy_rng, input=input, n_in=n_ins, n_out=hidden_layers_size, activation=T.nnet.sigmoid)
# Capa Denoising Autoencoder
self.dA_layer = dA(numpy_rng=numpy_rng, theano_rng=theano_rng, input=input, n_visible=n_ins, n_hidden=hidden_layers_size, W=self.sigmoid_layer.W, bhid=self.sigmoid_layer.b)
self.params = self.sigmoid_layer.params
self.output = self.sigmoid_layer.output
def pretraining(self, train_input, batch_size):
# Creamos los tensores
corruption_level = T.scalar('corruption')
learning_rate = T.scalar('lr')
index = T.iscalar('index')
# Calculamos los indices del lote
batch_begin = index * batch_size
batch_end = batch_begin + batch_size
# Calculamos los costes y las actualizaciones
cost, updates = self.dA_layer.get_cost_updates(corruption_level, learning_rate)
fn = theano.function(
inputs=[
index,
theano.Param(corruption_level, default=0.2),
theano.Param(learning_rate, default=0.1)],
outputs=cost,
updates=updates,
givens={self.input: train_input[batch_begin:batch_end]})
return fn
def getOut(self, inputData, num_inputs):
index = T.iscalar('index')
batch_begin = index * num_inputs
batch_end = (index+1) * num_inputs
n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size
salida = theano.function(
inputs=[index],
outputs=self.output,
givens={self.input: inputData[batch_begin:batch_end]})
return salida(0)
In [ ]:
class SdA_MLP_network(object):
def __init__(self, input, output, hidden_layers_sizes=[500, 500], corruption_levels=[0.1, 0.1], n_outs=10, n_ins=400):
numpy_rng = numpy.random.RandomState(89677)
self.input = input
self.output = output
self.corruption_levels = corruption_levels
self.layer0 = SdA_MLP_Layer(
input=input,
numpy_rng=numpy_rng,
n_ins=n_ins,
hidden_layers_size=hidden_layers_sizes[0],
corruption_level=corruption_levels[0])
self.layer1 = SdA_MLP_Layer(
input=self.layer0.output,
numpy_rng=numpy_rng,
n_ins=hidden_layers_sizes[0],
hidden_layers_size=hidden_layers_sizes[1],
corruption_level=corruption_levels[1])
self.log_layer = LogisticRegression(
input=self.layer1.output,
n_in=hidden_layers_sizes[-1],
n_out=n_outs)
self.params = self.log_layer.params + self.layer0.params + self.layer1.params
self.finetune_cost = self.log_layer.negative_log_likelihood(output)
self.errors = self.log_layer.errors(output)
def pretraining(self, inputData, batch_size, pretraining_epochs, pretrain_lr):
n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size
fn0 = self.layer0.pretraining(inputData, batch_size)
for epoch in xrange(pretraining_epochs):
c = []
for batch_index in xrange(n_train_batches):
c.append(fn0(
index=batch_index,
corruption=self.corruption_levels[0],
lr=pretrain_lr))
print 'Pre-training capa 0, epoch %d, cost %f' % (epoch, numpy.mean(c))
output = self.layer0.getOut(inputData, inputData.get_value().shape[0])
out = theano.shared(numpy.asarray(output, dtype=theano.config.floatX), borrow=True)
fn1 = self.layer1.pretraining(out, batch_size)
for epoch in xrange(pretraining_epochs):
c = []
for batch_index in xrange(n_train_batches):
c.append(fn1(
index=batch_index,
corruption=self.corruption_levels[1],
lr=pretrain_lr))
print 'Pre-training capa 1, epoch %d, cost %f' % (epoch, numpy.mean(c))
def fine_tune(self, inputData, outputData, batch_size, learning_rate):
# Indice neceario
index = T.iscalar('index')
gparams = T.grad(self.finetune_cost, self.params)
updates = [(param, param - gparam * learning_rate) for param, gparam in zip(self.params, gparams)]
train_fn = theano.function(
inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.input: inputData[index * batch_size:(index + 1) * batch_size],
self.output: outputData[index * batch_size:(index + 1) * batch_size]}, name='train')
return train_fn
def training(self, inputData, outputData, batch_size, n_epochs, learning_rate):
print "...entrenando"
n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size
train_fn = self.fine_tune(inputData, outputData, batch_size, learning_rate)
epoch = 0
coste = numpy.zeros((n_epochs, 1))
epoca = []
coste = []
while (epoch < n_epochs):
epoch = epoch + 1
minibatch_avg_cost = 0
for minibatch_index in xrange(n_train_batches):
minibatch_avg_cost = minibatch_avg_cost + train_fn(minibatch_index)
print 'Training MLP, epoch %d, cost %f' % (epoch, minibatch_avg_cost/n_train_batches)
coste.append(minibatch_avg_cost/n_train_batches)
epoca.append(epoch)
plt.plot(epoca, coste)
plt.show()
def test(self, inputData, outputData, batch_size):
index = T.iscalar('index')
n_train_batches = inputData.get_value(borrow=True).shape[0] / batch_size
test_fn = theano.function(
inputs=[index],
outputs=self.errors,
givens={
self.input: inputData[index * batch_size:(index + 1) * batch_size],
self.output: outputData[index * batch_size:(index + 1) * batch_size]}, name='test')
def test_score():
return [test_fn(i) for i in xrange(n_train_batches)]
print 1. - numpy.mean(test_score())
In [ ]:
finetune_lr = 0.1
pretraining_epochs = 100
corruption_levels = [.1, .2]
pretrain_lr = 0.001
training_epochs = 200
dataset = 'digits.mat'
batch_size = 500
print "...cargando datos"
data = io.loadmat(dataset, squeeze_me=True)
dataIn = data['X'].astype(float)
dataOut = data['y'].astype(int)
for i in range(dataOut.shape[0]):
if (dataOut[i] == 10):
dataOut[i] = 0
train_set_x = theano.shared(numpy.asarray(dataIn, dtype=theano.config.floatX), borrow=True)
train_set_y = T.cast(theano.shared(numpy.asarray(dataOut, dtype=theano.config.floatX), borrow=True), 'int32')
n_train_batches = train_set_x.get_value(borrow=True).shape[0]
n_train_batches /= batch_size
x = T.matrix('x') # Datos de entrada
y = T.ivector('y') # Salida esperada
sda = SdA_MLP_network(
x,
y,
n_outs=10,
n_ins=20 * 20)
sda.pretraining(train_set_x, batch_size, pretraining_epochs, pretrain_lr)
sda.training(train_set_x, train_set_y, batch_size, training_epochs, finetune_lr)
sda.test(train_set_x, train_set_y, batch_size)
In [ ]:
In [ ]: